Case NYC Taxi Trips

In [1]:
from glob import glob
import pandas as pd
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession,DataFrame,functions as F
from pyspark.sql.types import *
from functools import reduce
from rich import print
import plotly.express as px
import plotly.graph_objects as go
import plotly.offline as py
from ipywidgets import widgets
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

py.init_notebook_mode(connected=True)

Lendo as bases

In [2]:
#lendo a base de Trips
trips_path = glob("data/*.json")
trips_path.sort()
year = 2009
trips=list()

print ("Lendo as bases trips: ")
for file in trips_path:
    trip = spark.read.json(file)
    trip = trip.withColumn('year', F.lit(F.year('pickup_datetime')))
    print ("[bold]Ano: [/bold]"+str(year)+" [bold]Quantidade: [/bold]"+str(trip.count()))
    trips.append(trip)
    year +=1
Lendo as bases trips: 
Ano: 2009 Quantidade: 1000000
Ano: 2010 Quantidade: 1000000
Ano: 2011 Quantidade: 1000000
Ano: 2012 Quantidade: 1000000
In [3]:
#lendo a base Vendor Lookup
vendor = spark.read.csv('data-vendor_lookup-csv.csv', header=True)
In [4]:
#lendo a base Payment Lookup 
payment = spark.read.csv('data-payment_lookup-csv.csv', header=True)

Unindo as bases de trips em uma único dataframe

In [5]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
In [6]:
trips = unionAll(trips[0], trips[1], trips[2], trips[3])
In [7]:
print ("[bold]Colunas da tabela Trips: [/bold]: ")
print (trips.columns)
Colunas da tabela Trips: : 
[
    'dropoff_datetime',
    'dropoff_latitude',
    'dropoff_longitude',
    'fare_amount',
    'passenger_count',
    'payment_type',
    'pickup_datetime',
    'pickup_latitude',
    'pickup_longitude',
    'rate_code',
    'store_and_fwd_flag',
    'surcharge',
    'tip_amount',
    'tolls_amount',
    'total_amount',
    'trip_distance',
    'vendor_id',
    'year',
]
In [8]:
print ("[bold]Colunas da tabela Vendor: [/bold]: ")
print (vendor.columns)
Colunas da tabela Vendor: : 
['vendor_id', 'name', 'address', 'city', 'state', 'zip', 'country', 'contact', 'current']
In [9]:
print ("[bold]Colunas da tabela Payment: [/bold]: ")
print (payment.columns)
Colunas da tabela Payment: : 
['A', 'B']
In [10]:
print ("Total da tabela Trips concatenada anos 2009 a 2012: ", trips.count())
Total da tabela Trips concatenada anos 2009 a 2012:  4000000
In [11]:
print ("Total da tabela Payment: ", payment.count())
Total da tabela Payment:  3289
In [12]:
print ("Total da tabela Vendor: ", vendor.count())
Total da tabela Vendor:  5

Verificando os dataframes

In [13]:
trips.limit(5).toPandas()
Out[13]:
dropoff_datetime dropoff_latitude dropoff_longitude fare_amount passenger_count payment_type pickup_datetime pickup_latitude pickup_longitude rate_code store_and_fwd_flag surcharge tip_amount tolls_amount total_amount trip_distance vendor_id year
0 2009-04-21T18:57:09.433767+00:00 40.747950 -73.994712 5.4 2 Cash 2009-04-21T18:51:11.767205+00:00 40.742950 -74.004114 None None 0.0 0.0 0.0 5.4 0.80 CMT 2009
1 2009-01-13T07:50:36.386011+00:00 40.792385 -73.940449 15.4 1 Cash 2009-01-13T07:40:07.639754+00:00 40.747784 -73.996506 None None 0.0 0.0 0.0 15.4 5.40 CMT 2009
2 2009-01-06T19:43:09.908429+00:00 40.768108 -73.944535 6.1 5 CASH 2009-01-06T19:30:44.142187+00:00 40.752070 -73.951340 None None 1.0 0.0 0.0 7.1 1.67 VTS 2009
3 2009-09-24T08:41:56.739991+00:00 40.721812 -73.958122 12.9 1 CASH 2009-09-24T08:28:09.004451+00:00 40.729128 -74.001117 None None 0.5 0.0 0.0 13.4 4.13 VTS 2009
4 2009-10-29T05:57:22.880174+00:00 40.776075 -73.979757 7.7 1 CASH 2009-10-29T05:50:39.214629+00:00 40.756873 -73.976600 None None 0.5 0.0 0.0 8.2 2.03 VTS 2009
In [14]:
vendor.toPandas()
Out[14]:
vendor_id name address city state zip country contact current
0 CMT Creative Mobile Technologies, LLC 950 4th Road Suite 78 Brooklyn NY 11210 USA contactCMT@gmail.com Yes
1 VTS VeriFone Inc 26 Summit St. Flushing NY 11354 USA admin@vtstaxi.com Yes
2 DDS Dependable Driver Service, Inc 8554 North Homestead St. Bronx NY 10472 USA 9778896500 Yes
3 TS Total Solutions Co Five Boroughs Taxi Co. Brooklyn NY 11229 USA mgmt@5btc.com Yes
4 MT Mega Taxi 4 East Jennings St. Brooklyn NY 11228 USA contact@megataxico.com No
In [15]:
payment.limit(5).toPandas()
Out[15]:
A B
0 payment_type payment_lookup
1 Cas Cash
2 CAS Cash
3 Cre Credit
4 CRE Credit

Qual a distância média percorrida por viagens com no máximo 2 passageiros;

In [16]:
media = trips.filter(F.col('passenger_count') <=2).select(F.round(F.mean(F.col('trip_distance')),2)).collect()
print ("[bold]A distância média percorrida por viagens com no máximo [red]2[/red] passageiros é: [/bold]", media[0][0])
A distância média percorrida por viagens com no máximo 2 passageiros é:  2.66

Quais os 3 maiores vendors em quantidade total de dinheiro arrecadado;

Verificando a distribuição de tipos de pagamentos na tabela Trips

In [17]:
trips.groupBy(F.col('payment_type')).count().orderBy(F.col("count").desc()).show()
+------------+-------+
|payment_type|  count|
+------------+-------+
|        CASH|1686164|
|        Cash|1556688|
|      Credit| 703124|
|      CREDIT|  39156|
|   No Charge|  12448|
|     Dispute|   2420|
+------------+-------+

In [18]:
#padronizando a variável payment_type
trips = trips.withColumn("payment_type", F.upper(F.col('payment_type')))

Verificando a distribuição de tipos de pagamentos na tabela Trips após Padronização

In [19]:
trips.groupBy(F.col('payment_type')).count().orderBy(F.col("count").desc()).show()
+------------+-------+
|payment_type|  count|
+------------+-------+
|        CASH|3242852|
|      CREDIT| 742280|
|   NO CHARGE|  12448|
|     DISPUTE|   2420|
+------------+-------+

In [20]:
print ("[bold]Os [red]3[/red] maiores vendors em quantidade total de dinheiro arrecadado são: [/bold]")

name = vendor.select('vendor_id', 'name')

trips.filter(F.col('payment_type') == 'CASH').select(F.col('vendor_id'), F.col('total_amount'))\
     .groupBy(F.col('vendor_id')).agg(F.sum("total_amount").alias('total'))\
     .join(name, 'vendor_id', "inner").orderBy(F.col('total').desc()).select('name', 'total').limit(3).toPandas()
Os 3 maiores vendors em quantidade total de dinheiro arrecadado são: 
Out[20]:
name total
0 Creative Mobile Technologies, LLC 14532005.00
1 VeriFone Inc 13827166.08
2 Dependable Driver Service, Inc 2122999.72

Histograma da distribuição mensal, nos 4 anos, de corridas pagas em dinheiro

In [21]:
def histogram(year):
    df = trips.filter(F.col('payment_type') == 'CASH').filter(F.col('year') == year)\
          .select(F.month(F.col('dropoff_datetime')).alias('month')).orderBy('month').toPandas()

    fig = px.histogram(df, x="month", title='Distribuição Mensal de Corridas pagas em Dinheiro em '+str(year))
    fig.show()
In [22]:
for y in range(2009, 2013):
    histogram(y)

Podemos perceber que sempre nos últimos meses do ano as corridas pagas em dinheiro diminuem

Gráfico de série temporal contando a quantidade de gorjetas de cada dia, nos últimos 3 meses de 2012

In [23]:
df_month = trips.filter(F.col('year') == 2012).filter(F.month('dropoff_datetime')>=10)\
          .select(F.month('dropoff_datetime').alias('month'), F.dayofmonth('dropoff_datetime')\
                  .alias('day'), F.col('tip_amount'))\
          .groupBy(F.col('month'), F.col('day')).agg(F.sum("tip_amount").alias('total_tip'))\
          .withColumn('month', F.col('month').cast(StringType())).toPandas()


fig = px.scatter(df_month, x='day', y='total_tip', color='month',
                 title="Quantidade de Gorjetas por dia nos últimos 3 meses de 2012")

fig.update_layout(
    showlegend=True)


fig.show(config=dict(displayModeBar=False))

Como percebemos olhando o gráfico acima no ano de 2012 só teve corrida com gorjeta até o mês de Outubro, por isso vamos olha agora os três últimos meses que teve corrida no ano de 2012, ou seja, Agosto, Setembro e Outubro

In [24]:
df_month = trips.filter(F.col('year') == 2012).filter(F.month('dropoff_datetime')>=8)\
          .select(F.month('dropoff_datetime').alias('month'), F.dayofmonth('dropoff_datetime')\
                  .alias('day'), F.col('tip_amount'))\
          .groupBy(F.col('month'), F.col('day')).agg(F.sum("tip_amount").alias('total_tip'))\
          .withColumn('month', F.col('month').cast(StringType())).toPandas()


fig = px.scatter(df_month, x='day', y='total_tip', color='month',
                 title="Quantidade de Gorjetas por dia nos meses de Agosto, Setembro e Outubro de 2012")

fig.update_layout(
    showlegend=True)


fig.show(config=dict(displayModeBar=False))

Qual o tempo médio das corridas nos dias de sábado e domingo

Colocando as variáveis dropoff_datetime e pickup_datetime no formato timestamp

In [25]:
trips = trips.withColumn('dropoff_datetime', F.to_timestamp(F.col('dropoff_datetime').cast(StringType())))\
             .withColumn('pickup_datetime', F.to_timestamp(F.col('pickup_datetime').cast(StringType())))

Criando a variável duration que é a diferença do tempo entre as variáveis dropoff_datetime e pickup_datetime

In [26]:
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('dropoff_datetime', format=timeFmt)
            - F.unix_timestamp('pickup_datetime', format=timeFmt))
In [27]:
trips = trips.withColumn("duration", timeDiff)
In [28]:
media = trips.filter((F.dayofweek('dropoff_datetime') == 7) | (F.dayofweek('dropoff_datetime') == 1))\
             .select(F.round(F.mean(F.col('duration')),2)).collect()

media = round(media[0][0]/60, 2)

print ("[bold]O tempo médio das corridas nos dias de sábado e domingo é: {0} minutos[/bold]".format(media))
O tempo médio das corridas nos dias de sábado e domingo é: 8.75 minutos

Visualização em mapa com latitude e longitude de ​ pickups and ​ dropoffs ​ no ano de 2010

Visualizando somente os 100 inicios e finalizações de corridas mais solicitados

In [29]:
df_off = trips.filter(F.col('year') == 2010).groupBy('dropoff_latitude', 'dropoff_longitude').count()\
             .orderBy(F.col('count').desc()).filter(F.col('dropoff_latitude') != 0.0).toPandas()
df_off['text'] = "Dropoff: "+(df_off['count']).astype(str)

df_up = trips.filter(F.col('year') == 2010).groupBy('pickup_latitude','pickup_longitude').count()\
             .orderBy(F.col('count').desc()).filter(F.col('pickup_latitude') != 0.0).toPandas()
df_up['text'] = "Pickup: "+(df_up['count']).astype(str)

colors = ["darkred", "darkgreen"]

fig = go.Figure()

fig.add_trace(go.Scattergeo(
    locationmode = 'USA-states',
    lon = df_up['pickup_longitude'].values[:100],
    lat = df_up['pickup_latitude'].values[:100],
    text = df_up['text'].values[:100],
    marker = dict(
        size = df_up['count'].values[:100]/2,
        color = colors[1],
        line_color='#2c3e50',
        line_width=0.5,
        sizemode = 'area'
    ),
name = 'Pickup'))

fig.add_trace(go.Scattergeo(
    locationmode = 'USA-states',
    lon = df_off['dropoff_longitude'].values[:100],
    lat = df_off['dropoff_latitude'].values[:100],
    text = df_off['text'].values[:100],
    marker = dict(
        size = df_off['count'].values[:100]/2,
        color = colors[0],
        line_color='rgb(40,40,40)',
        line_width=0.5,
        sizemode = 'area'
    ),
name = 'Dropoff'))

fig.update_layout(
    title = '<b>Latitude e longitude de ​ pickups and ​ dropoffs ​ no ano de 2010 (Os 100 mais frequentes)</b>',
    titlefont = {'family': 'Arial', 'size': 20},
    showlegend = True,
    geo = dict(
        scope = 'usa',
        landcolor = 'rgb(217, 217, 217)',
        projection = {'type':'albers usa'},
        showland = True,
        showlakes = True,
        lakecolor = "#3498db",
        subunitwidth = 1,
        subunitcolor = 'rgb(255, 255, 255)'
    )
)

py.iplot(fig)

Simulando um ​ streaming dos dados

Simulando um streaming dos dados no formato em json utilizando o Spark Streaming em conjunto com o Kafka e o Flume

A aplicação irá realizar consulta a cada 3 minutos

In [30]:
sc = spark.sparkContext
ssc = StreamingContext(sc, 180)
sc.setLogLevel("WARN")
In [31]:
kvs = KafkaUtils.createDirectStream(ssc, ["taxi_trips"],{"metadata.broker.list": 'localhost:9092'})
In [33]:
stream = kvs.map(lambda x: json.loads(x[1]))
In [34]:
stream.count().map(lambda x: "Qtd de linhas lidas: %s" % x).pprint()
In [35]:
payment_stream = stream.map(lambda trips: trips['payment_type'])
payment_count = payment_stream.countByValue()
In [36]:
payment_count.pprint()
In [37]:
ssc.start()
In [38]:
try:
    ssc.awaitTermination()
except KeyboardInterrupt as erro:
    print ("[red bold]Finalizado :tada:")
-------------------------------------------
Time: 2020-07-31 16:09:00
-------------------------------------------
Qtd de linhas lidas: 293420

-------------------------------------------
Time: 2020-07-31 16:09:00
-------------------------------------------
('No Charge', 918)
('Dispute', 164)
('CREDIT', 3012)
('CASH', 123083)
('Credit', 52216)
('Cash', 114027)

-------------------------------------------
Time: 2020-07-31 16:12:00
-------------------------------------------
Qtd de linhas lidas: 85327

-------------------------------------------
Time: 2020-07-31 16:12:00
-------------------------------------------
('Dispute', 48)
('No Charge', 257)
('CREDIT', 775)
('Credit', 15399)
('CASH', 38760)
('Cash', 30088)

-------------------------------------------
Time: 2020-07-31 16:15:00
-------------------------------------------
Qtd de linhas lidas: 119157

-------------------------------------------
Time: 2020-07-31 16:15:00
-------------------------------------------
('No Charge', 341)
('Dispute', 66)
('CREDIT', 1182)
('Credit', 22036)
('CASH', 51875)
('Cash', 43657)

-------------------------------------------
Time: 2020-07-31 16:18:00
-------------------------------------------
Qtd de linhas lidas: 231596

-------------------------------------------
Time: 2020-07-31 16:18:00
-------------------------------------------
('No Charge', 733)
('Dispute', 136)
('CREDIT', 2367)
('Credit', 40864)
('CASH', 96019)
('Cash', 91477)

-------------------------------------------
Time: 2020-07-31 16:21:00
-------------------------------------------
Qtd de linhas lidas: 187580

-------------------------------------------
Time: 2020-07-31 16:21:00
-------------------------------------------
('No Charge', 593)
('Dispute', 132)
('CREDIT', 1872)
('Credit', 31820)
('CASH', 79061)
('Cash', 74102)

-------------------------------------------
Time: 2020-07-31 16:24:00
-------------------------------------------
Qtd de linhas lidas: 203880

-------------------------------------------
Time: 2020-07-31 16:24:00
-------------------------------------------
('Dispute', 132)
('No Charge', 661)
('CREDIT', 1941)
('CASH', 82717)
('Credit', 35129)
('Cash', 83300)

-------------------------------------------
Time: 2020-07-31 16:27:00
-------------------------------------------
Qtd de linhas lidas: 121800

-------------------------------------------
Time: 2020-07-31 16:27:00
-------------------------------------------
('No Charge', 370)
('Dispute', 65)
('CREDIT', 1137)
('Credit', 21685)
('CASH', 50987)
('Cash', 47556)

-------------------------------------------
Time: 2020-07-31 16:30:00
-------------------------------------------
Qtd de linhas lidas: 0

-------------------------------------------
Time: 2020-07-31 16:30:00
-------------------------------------------

Finalizado 🎉